Vodafone / O2 Dwell Data Analysis#

Setup#

Imports and Functions#

import duckdb
import geopandas as gpd
import pandas as pd
import plotly.express as px
from pathlib import Path
from getpass import getpass
import h3pandas
from shapely.geometry import box
from sklearn.cluster import DBSCAN
import numpy as np
import folium

pd.options.plotting.backend = "plotly"  # set plotly as the default plotting engine for pandas
pd.set_option("future.no_silent_downcasting", True)  # raise error when downcasting
def fetch_csv(url: str) -> pd.DataFrame:
    """Reads and parses a CSV using DuckDB, returning a Pandas DataFrame."""
    query = f"SELECT * FROM read_csv_auto('{url}');"
    with duckdb.connect(database=":memory:") as conn:
        return conn.sql(query).fetchdf()


def process_pings(df: pd.DataFrame, check_data_quality: bool = False) -> gpd.GeoDataFrame:
    """Processes the GPS pings DataFrame, returning a GeoDataFrame."""
    if check_data_quality:
        dt_dtypes = ("datetime64[ns]", "<M8[us]")
        assert df["datetime"].dtype in dt_dtypes, "Invalid datetime data type"

        assert df.isna().sum().any() == False, "Missing data in DataFrame"
        assert df.duplicated().sum() == 0, "Duplicate rows in DataFrame"

        assert df["lat"].dtype == "float64", "Invalid latitude data type"
        assert df["lon"].dtype == "float64", "Invalid longitude data type"

        assert df["lat"].between(-90, 90).all(), "Latitude values outside of valid range"
        assert df["lon"].between(-180, 180).all(), "Longitude values outside of valid range"

    geometry = lambda df: gpd.points_from_xy(df["lon"], df["lat"], crs="EPSG:4326")
    rename_cols = {"datetime": "pinged_at"}
    drop_cols = ["lat", "lon"]

    return (
        df.assign(geometry=geometry)
        .drop(columns=drop_cols)
        .rename(columns=rename_cols)
        .pipe(gpd.GeoDataFrame)  # convert to GeoDataFrame
        .sort_values(by=["user_id", "pinged_at"], ignore_index=True)
    )

Process/Load Data#

url = getpass()
out_path = Path("../data/gps.parquet.gzip")

if not out_path.exists():
    out_path.parent.mkdir(exist_ok=True)
    df = fetch_csv(url)
    df.pipe(process_pings, check_data_quality=True).to_parquet(out_path, compression="gzip")
    del df

print(f"{out_path}: {out_path.stat().st_size / 1e6:.2f} MB (compressed)")
gdf = gpd.read_parquet(out_path)
../data/gps.parquet.gzip: 33.44 MB (compressed)

Exploratory Data Analysis#

gdf.head(3)
user_id pinged_at geometry
0 0001347E-B3AD-4B84-80B9-14312A5CDBF4 2018-01-12 13:33:17 POINT (-0.11597 51.49539)
1 0001347E-B3AD-4B84-80B9-14312A5CDBF4 2018-01-12 13:38:22 POINT (-0.10594 51.49857)
2 0001347E-B3AD-4B84-80B9-14312A5CDBF4 2018-01-12 13:43:24 POINT (-0.09604 51.49963)
gdf["pinged_at"].describe().to_frame().T
count mean min 25% 50% 75% max
pinged_at 1770011 2018-01-14 03:25:27.649975 2018-01-08 00:00:20 2018-01-10 16:56:02 2018-01-15 04:41:03 2018-01-17 15:44:51 2018-01-19 23:59:41
users = gdf["user_id"].nunique()
pings = gdf.shape[0]

print(f"The data contains {pings:,} pings for {users:,} unique users.")
The data contains 1,770,011 pings for 31,606 unique users.

Pings Per User#

user_pings = (
    gdf.groupby("user_id", as_index=False)
    .size()
    .rename(columns={"size": "pings"})
    .sort_values(by="pings", ascending=False)
)
user_pings.describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.8, 0.95, 0.99, 0.999]).round(2).T
count mean std min 10% 25% 50% 75% 80% 95% 99% 99.9% max
pings 31606.0 56.0 57.15 20.0 22.0 27.0 39.0 65.0 74.0 145.0 257.0 548.37 3430.0
fig = px.ecdf(
    user_pings,
    x="pings",
    title="ECDF of User Pings (aka x% of users have at least n pings)",
)

# usually ecdf is reported probability density, for readability report as %
fig.update_layout(
    xaxis_title="Number of Pings",
    yaxis_title="% of Users",
    yaxis_tickformat=".0%",
    showlegend=False,
)

fig.show()

Pings Per Day#

grouper = [pd.Grouper(key="pinged_at", freq="D")]  # to group by day while avoiding missing dates

ping_dates = (
    gdf.groupby(grouper, as_index=False)
    .size()
    .rename(columns={"size": "pings"})
    .sort_values(by="pinged_at")
    .assign(
        day_name=lambda df: df["pinged_at"].dt.day_name(),
        week_number=lambda df: df["pinged_at"].dt.isocalendar().week,
    )
)
ping_dates.plot(x="pinged_at", y="pings", title="Daily Pings")

Week over Week Comparison#

ping_dates.query("pings > 0").plot(x="day_name", y="pings", color="week_number", title="Daily Pings")

Examine Spatial Extent & Macro Cluster Locations#

Here we see the overall spatial extent of the data, as well as the locations of point of interest (POI) clusters. The POI clusters are determined by the DBSCAN algorithm, which groups points that are within a certain distance of each other.

We can see that key tube and train stations and high vehicular traffic areas have formed the key clusters.

# two layers, a bounding box of the spatial extent (of all data) and some randomly sampled points
total_spatial_extent = gpd.GeoDataFrame(geometry=[box(*gdf.total_bounds)], crs=gdf.crs)
sampled_points = gdf.sample(10_000, random_state=42).drop(columns=["user_id", "pinged_at"])
# cluster the sampled points to identify areas of high density (i.e. find hotspots)
# this is macro level clustering, without considering time or each user's individual patterns
kms_per_radian = 6371.0088  # radius of the Earth in kilometers
distance_km = 1
epsilon = distance_km / kms_per_radian  # convert to radians

dbscan = DBSCAN(eps=epsilon, min_samples=20, metric="haversine", algorithm="ball_tree")

coordinates = np.column_stack((sampled_points["geometry"].y, sampled_points["geometry"].x))
sampled_points["cluster"] = dbscan.fit_predict(coordinates)
# create a map
m = sampled_points.explore(
    name="Sampled Points", column="cluster", categorical=True, legend=False  # too many clusters to be useful
)

total_spatial_extent.explore(m=m, name="Total Spatial Extent", color="grey")
folium.LayerControl().add_to(m)
m
Make this Notebook Trusted to load map: File -> Trust Notebook

Calculate Dwell Times#

First step is to group user ‘journeys’ by using sensible (time and distance) cutoffs between subsequent pings. In other words, grouping pings that are close in time and space into a single ‘journey’.

def calculate_dwell_attributes(
    gdf: gpd.GeoDataFrame, distance_threshold: float = 600, time_threshold: float = 600
) -> gpd.GeoDataFrame:
    """
    Identifies dwells in a GeoDataFrame by comparing consecutive points against distance and time thresholds.
    Adds columns for distance/time differences, dwell flag, and dwell IDs.

    Parameters:
    - gdf: GeoDataFrame with 'geometry' and 'pinged_at' columns.
    - distance_threshold: Distance limit in metres to define a dwell.
    - time_threshold: Time limit in seconds to define a dwell.
    - h3_res: H3 Index resolution, default 8.

    Returns: GeoDataFrame with added dwell-related columns.
    """
    return gdf.assign(
        lag_distance_diff=lambda df: df["geometry"].distance(df["geometry"].shift()),
        lag_time_diff=lambda df: df["pinged_at"].diff().dt.total_seconds(),
        is_potential_dwell=lambda df: (
            # apply small threshold to remove consecutive pings with no movement / time difference
            (df["lag_time_diff"] > 0.01)
            & (df["lag_distance_diff"] > 0.01)
            # discretise time and distance differences to identify potential dwells
            & (df["lag_distance_diff"] < distance_threshold)
            & (df["lag_time_diff"] < time_threshold)
        ),
        dwell_id=lambda df: (~df["is_potential_dwell"]).cumsum(),
    )
h3_resolution = 10
h3_col_name = f"h3_{h3_resolution:02d}"
cols = gdf.columns.to_list() + [h3_col_name]

dwells = (  # gdf already sorted by user_id and pinged_at
    gdf.h3.geo_to_h3(resolution=h3_resolution, set_index=False)
    .to_crs("EPSG:27700")  # use British National Grid for distance calculations (metres)
    .groupby(["user_id"], group_keys=False, sort=False, as_index=False)[cols]
    .apply(calculate_dwell_attributes)
    .groupby(["user_id", "dwell_id", h3_col_name], as_index=False)
    .agg(
        dwell_start=("pinged_at", "min"),
        dwell_end=("pinged_at", "max"),
        dwell_pings=("pinged_at", "count"),
        geometries=("geometry", list),
    )
    .query("dwell_pings > 1")  # at least two pings (per 'user-journey') to define a dwell within a single H3 Index
    .assign(dwell_duration_seconds=lambda df: (df["dwell_end"] - df["dwell_start"]).dt.total_seconds())
)

Distribution of pings per dwell

dwells["dwell_pings"].round(-1).value_counts().to_frame().sort_index()
count
dwell_pings
0 170309
10 1718
20 105
30 29
40 14
50 4
60 3
70 3
80 4
90 1
100 1
110 4
140 2
150 2
260 1
350 1
430 1
dwells.query("dwell_pings == dwell_pings.max()")
user_id dwell_id h3_10 dwell_start dwell_end dwell_pings geometries dwell_duration_seconds
406827 435771D7-6329-4664-8C4A-C1366A1CDDB0 52 8a194ad148e7fff 2018-01-09 23:33:45 2018-01-10 04:44:43 431 [POINT (530981.4749883055 179168.01740387268),... 18658.0

Visualise Dwells#

Single user dwell visualisation (most pinged user)

(
    dwells.query('user_id == "435771D7-6329-4664-8C4A-C1366A1CDDB0"')
    .explode(column="geometries")
    .pipe(gpd.GeoDataFrame)
    .set_geometry("geometries", crs="EPSG:27700")
    .assign(dwell_start=lambda df: df["dwell_start"].astype(str), dwell_end=lambda df: df["dwell_end"].astype(str))
    .explore(column="dwell_id", categorical=True)
)
Make this Notebook Trusted to load map: File -> Trust Notebook

High Dwell Ping Count Visualisation

(
    dwells.query("dwell_pings > 6")
    .explode(column="geometries")
    .pipe(gpd.GeoDataFrame)
    .set_geometry("geometries", crs="EPSG:27700")
    .drop(columns=["dwell_start", "dwell_end"])
    .explore(column="dwell_id", categorical=True, legend=False)
)
Make this Notebook Trusted to load map: File -> Trust Notebook

Average Dwell Times Across H3 Cells#

h3_avg_dwells = (
    dwells.groupby([h3_col_name], as_index=True)
    .agg(avg_dwell_duration_seconds=("dwell_duration_seconds", "mean"))
    .h3.h3_to_geo_boundary()
)
h3_avg_dwells.query("avg_dwell_duration_seconds > 100").explore(
    column="avg_dwell_duration_seconds", scheme="NaturalBreaks"
)
Make this Notebook Trusted to load map: File -> Trust Notebook